
#include "event.h"
#include "LoopService.h"
#include "event2/thread.h"
#include "event2/util.h"
#include <assert.h>

#ifdef _WIN32 
struct ws32_init_tmp__ {
    ws32_init_tmp__() {
        WSADATA wsa;
        WSAStartup(MAKEWORD(2, 2), &wsa);
    }
    ~ws32_init_tmp__() {
        WSACleanup();
    }
} __tmp;
#endif

LoopService::LoopService()
	: create_evbase_myself_(true)
	, notified_(false)
	, pending_functor_count_(0)
	, runing_(false)
{
    evbase_ = event_base_new();
	_init();
}

LoopService::LoopService(struct event_base* base)
	: evbase_(base)
	, create_evbase_myself_(false)
	, notified_(false)
	, pending_functor_count_(0)
	, runing_(false)
{
	_init();

	// When we build an EventLoop instance from an existing event_base
	// object, we will never call EventLoop::Run() method.
	// So we need to watch the task queue here.
	auto rc = watcher_->async_wait();
	assert(rc);
	if (!rc) {
        fprintf(stderr, "PipeEventWatcher init failed.\n");
        abort();
	}
	runing_.store(true);
}


LoopService::~LoopService()
{
	if (evbase_ != nullptr && create_evbase_myself_) {
		event_base_free(evbase_);
		evbase_ = nullptr;
	}
}

void LoopService::after_fork()
{
    int rc = event_reinit(evbase_);
    assert(rc == 0);

    if (rc != 0) {
        fprintf(stderr, "event_reinit failed!\n");
        return;
    }

    // We create EventLoopThread and initialize it in father process,
    // but we use it in child process.
    // If we have only one child process, everything goes well.
    //
    // But if we have multi child processes, something goes wrong.
    // Because EventLoop::watcher_ is created and initialized in father process
    // all children processes inherited father's pipe.
    //
    // When we use the pipe to do a notification in one child process
    // the notification may be received by another child process randomly.
    //
    // So we need to reinitialize the watcher_
    _init_notify_pipe_watcher();
}

void LoopService::run()
{
	tid_ = std::this_thread::get_id(); // The actual thread id

	int rc = watcher_->async_wait();
	assert(rc);
	if (!rc) {
        fprintf(stderr, "PipeEventWatcher init failed.\n");
        abort();
	}

	runing_.store(true);
	rc = event_base_dispatch(evbase_);
	if (rc == 1) {
        fprintf(stderr, "event_base_dispatch error: no event registered\n");
        abort();
	}
	else if (rc == -1) {
		int serrno = EVUTIL_SOCKET_ERROR();
        fprintf(stderr, "event_base_dispatch error=%d %s\n", serrno, 
			 evutil_socket_error_to_string(serrno));
	}

	// Make sure watcher_ does construct, initialize and destruct in the same thread.
	watcher_.reset();
}

void LoopService::stop()
{
	post(std::bind(&LoopService::_stop_in_loop, this));
}

void LoopService::dispatch(const void_0_arg_fun& handler)
{
	if (is_running() && is_in_loop_thread()) {
		handler();
	}
	else {
		post(handler);
	}
}

void LoopService::post(const void_0_arg_fun& handler)
{
	{
		std::lock_guard<std::mutex> lock(mutex_);
		pending_functors_.emplace_back(handler);
	}
	++pending_functor_count_;
	if (!notified_.load()) {
		
		// We must set notified_ to true before calling `watcher_->Nodify()`
		// otherwise there is a change that:
		//  1. We called watcher_- > Nodify() on thread1
		//  2. On thread2 we watched this event, so wakeup the CPU changed to run this EventLoop on thread2 and executed all the pending task
		//  3. Then the CPU changed to run on thread1 and set notified_ to true
		//  4. Then, some thread except thread2 call this QueueInLoop to push a task into the queue, and find notified_ is true, so there is no change to wakeup thread2 to execute this task
		notified_.store(true);

		// Sometimes one thread invoke EventLoop::QueueInLoop(...), but anther
		// thread is invoking EventLoop::Stop() to stop this loop. At this moment
		// this loop maybe is stopping and the watcher_ object maybe has been
		// released already.
		if (watcher_) {
			watcher_->notify();
		}
		else {
			assert(!is_running());
		}
	}
	// "No need to call watcher_->Nofity()";
}

void LoopService::_init()
{
	tid_ = std::this_thread::get_id(); // The default thread id

	_init_notify_pipe_watcher();
}

void LoopService::_init_notify_pipe_watcher()
{
	watcher_.reset(new PipeEventWatcher(this, std::bind(&LoopService::_do_pending_functors, this)));
	int rc = watcher_->init();
	assert(rc);
	if (!rc) {
        fprintf(stderr, "PipeEventWatcher init failed.\n");
        abort();
	}
}

void LoopService::_stop_in_loop()
{
	auto f = [this]() {
		for (int i = 0;; i++) {
			_do_pending_functors();
			if (_is_pending_queue_empty()) {
				break;
			}
		}
	};

	f();

	runing_.store(false);
	event_base_loopexit(evbase_, nullptr);
	
	f();
}

void LoopService::_do_pending_functors()
{
	std::vector<void_0_arg_fun> functors;
	{
		std::lock_guard<std::mutex> lock(mutex_);
		notified_.store(false);
		pending_functors_.swap(functors);
	}
	for (size_t i = 0; i < functors.size(); ++i) {
		functors[i]();
		--pending_functor_count_;
	}
}



